package com.bfxy.payb.service.consumer;

import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.bfxy.payb.entity.PlatformAccount;
import com.bfxy.payb.mapper.PlatformAccountMapper;
import com.bfxy.payb.utils.FastJsonConvertUtil;

@Component
public class PayConsumer {

    // 默認的consumer
    private DefaultMQPushConsumer consumer;

    private static final String NAMESERVER = "192.168.40.201:9876;192.168.40.202:9876;192.168.40.203:9876;192.168.40.204:9876;";

    private static final String CONSUMER_GROUP_NAME = "tx_pay_consumer_group_name";

    // 監聽（监听）生產端的topic，也就是paya的topic
    public static final String TX_PAY_TOPIC = "tx_pay_topic";

    public static final String TX_PAY_TAGS = "pay";

    //
    @Autowired
    private PlatformAccountMapper platformAccountMapper;

    private PayConsumer() {
        try {
            this.consumer = new DefaultMQPushConsumer(CONSUMER_GROUP_NAME);// 實例化consumer
            this.consumer.setConsumeThreadMin(10);// 最小十個線程
            this.consumer.setConsumeThreadMax(30);// 最大
            this.consumer.setNamesrvAddr(NAMESERVER);// NameServer地址
            this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 消息從哪個位置開始消費
            this.consumer.subscribe(TX_PAY_TOPIC, TX_PAY_TAGS);// 訂閱上面監聽的topic和tags
            this.consumer.registerMessageListener(new MessageListenerConcurrently4Pay());// 內部類
            this.consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    // 內部類，第51行
    class MessageListenerConcurrently4Pay implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 一次消費一條，在這裡不是批量消費，批量消費是來了一個消息將消息中的內容批量的發送出去。
            MessageExt msg = msgs.get(0);
            try {
                String topic = msg.getTopic();
                String tags = msg.getTags();
                String keys = msg.getKeys();
                String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);//這裡的body是經過序列化的body
                System.err.println("收到事务消息, topic: " + topic + ", tags: " + tags + ", keys: " + keys + ", body: " + body);

                //	消息一单过来的时候（去重 幂等操作）
                //	数据库主键去重<去重表 keys>
                // 	insert table --> insert ok & primary key（主鍵去重，當數據量不是特別大的時候可以用）
                Map<String, Object> paramsBody = FastJsonConvertUtil.convertJSONToObject(body, Map.class);//轉碼，因為在發消息的時候body進行了編碼
                String userId = (String) paramsBody.get("userId");    // customer userId，用戶的賬號ID
                String accountId = (String) paramsBody.get("accountId");    //customer accountId，用戶的賬戶ID
                String orderId = (String) paramsBody.get("orderId");    // 	统一的订单
                BigDecimal money = (BigDecimal) paramsBody.get("money");    //	当前的收益款

                PlatformAccount pa = platformAccountMapper.selectByPrimaryKey("platform001");    //	当前平台的一个账号
                pa.setCurrentBalance(pa.getCurrentBalance().add(money));//餘額加上收益款
                Date currentTime = new Date();
                pa.setVersion(pa.getVersion() + 1);// 版本號
                pa.setDateTime(currentTime);
                pa.setUpdateTime(currentTime);
                platformAccountMapper.updateByPrimaryKeySelective(pa);// 更新餘額
            } catch (Exception e) {
                e.printStackTrace();
                //msg.getReconsumeTimes();// 同一個消息	重試了多少次
                //	如果处理多次操作还是失败, 记录失败日志（做补偿 回顾 人工处理）
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }


}
